整个源码阅读基于ElasticSearch5.4.3
,后续会整理成一个系列去逐步探索Elasticsearch
的世界。
本文讨论了ES
源码中广泛应用到的的Future
模式。
1、 前言
首先分析客户端的RPC调用部分,使用了Future模式。
1.1 Future模式
分析代码前先简单介绍下Future模式
From 《Java 高并发程序设计》
Future模式,核心思想是异步调用,就是当调用一个方法时,这个函数可能执行得很慢,就需要等待,
但是有时候并不着急要这个结果,所以选择不去傻傻等待,而是做其他的事情。就好比”双十一”购物,你买到了想要的东西,
那么你不可能等待它到货,然后才做另一件事情,你可能想继续购物其他的商品。而对于已经购买得商品,会生成一个订单,
你只需要等待这个订单的快递通知(notify)就行了。
1.2 一个最基本的future模式代码片段
1 | class Demo{ |
1.3 线程池的中的Future(FutureTask)
类比的我们看一下线程池的future实现,场景是把一个回调任务交给线程池然后返回给你一个Future,当任务执行完成释放阻塞。
1 | public abstract class AbstractExecutorService implements ExecutorService { |
线程池的submit过程后续单独讲,继续跟进FutureTask的run,发现最终调用set方法并且遍历通知Waitor。
1 | public class FutureTask<V> implements RunnableFuture<V> { |
FutureTask是一种RunnableFuture,执行的是本地线程调用run()去同步的set(v),
而我们需要做的是要等待Netty的异步回调。so我们需要的是另外的Future实现。
2、 ES中的Future模式
2.1 场景
我们需要的场景是(先不讨论ES是怎么设计的,我们自己设计一个Future)
1) client -> get();//阻塞
2) Netty异步接收Response然后cast to V 调用requestId对应的回调responseHandler
3) 然后再调用responseHandler 的代理listener
4) listener的实现是一个Future 需要做set(v) then notify get()的过程。(get可能是多处,那么肯定是要共享锁)
看了上面四条,是不是脑子里马上想到以下几个方案可以实现(线程之间通信):
1) BlockingQueue,然后1对应的take(),3对应的发起put()。(源自2)
2) Condition,然后1对应的循环+条件+await(),3对应的发起signal()。(源自3)
3) 利用共享模式的Aqs,自己基于cas和等待队列实现。
4) 同步代码块
1 | public class A{ |
Es选择的是3。BaseFuture->Sync->Aqs。
当然了我们万能的guava的AbstractFuture同样给出了实现,
jdk的异步非阻塞的CompletableFuture也给出了实现。作者本人也基于CompletableFuture对源生es做了改造。es自己实现的原因见BaseFuture链接
2.2 源码
回到正题,我们继续看源码,Client向网关节点A发起异步请求,拿到ActionFuture(继承自Future)。代码如下
AbstractClient实现了Client 对应的所有对外提供的action的api,比如 index,get,search等
1 | interface Client{ |
- TransportActionNodeProxy专门负责代理请求对应的action(比如get search bulk等)
1 | public class TransportActionNodeProxy<Request extends ActionRequest, Response extends ActionResponse> extends AbstractComponent { |
code1最终会执行code3
code2返回的future在其调用get()之前都是非阻塞的,而code3只是发起了请求并把requestId注册回调函数,
只有handleResponse的时候调用该resquestId的回调才会set(v) 填充数据并释放阻塞。TransportService(Es通信过程中存请求上下文,以及RPC方法映射,即request对应action的地方),
的 clientHandlers就是异步回调池(根据requestId拿到回调执行),存放的就是requestId以及对应的
TransportResponseHandler,与code1的request是一一对应的。ActionListener就是声明的回调接口,onResponse最终会执行set(v),而onFailure会执行setException(e),把code2赋值。
code1a es5.x比2.x增加了Supplier,ActionListenerResponseHandler的回调声明不用实现newInstance了,
取而代之的是声明式的Supplier(获取只需要掉无参get()),由上方的GenericAction子类(专门声明action消息的)去new。
5.x大量使用了jdk8的特性。
1 | public class TransportService extends AbstractLifecycleComponent<TransportService> { |
- 我们java客户端调用的场景通常是这样的
1 | public class TestGet{ |
- 继续跟code4,5,6发现其实现是在父类BaseFuture,且具体实现其实是交给了内部类Sync(继承自AQS)
1 | public abstract class BaseFuture<V> implements Future<V> { |
- AQS park 和 unpark的过程
1 | public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer |
- 那么对应get就是最终会执行code7去阻塞(采用的AQS共享模式),而异步回调结果的onResponse和onFailure,
就都是对应到了 code9,code10,code11。code9就是设置aqs队列的状态为完成,
code10是针对共享模式的锁释放(比如读锁),code11是发现别的线程正在设置完成中(并发问题),等待别人设置完成(小概率)。
3、 总结
Future模式本身是一种被广泛运用的并发设计模式,ES自己通过AQS实现了BaseFuture并广泛的应用在了客户端Api的设计上。